# =============================================================================
# This code is intended to show how the sample illustrating the Safegraph data was created.
#   A sufficient subset of code used to clean data is provided. Code was used to read and organize the full safegraph data that will not be provided. 
#   The raw data is proprietary and will not be provided. 
#   The output of this code is provided.
#   
#   NOTE: THE PATTERN DATA IS A RANDOMIZED VERSION OF A SLICE FROM A SINGLE MONTH. ANALYSIS WITH THIS SAMPLE IS NOT FEASIBLE AND RESULTS WILL NOT BE INTERPRETABLE.
# =============================================================================



# =============================================================================
# Custom Functions
def downsize(df,types):
    for var in list(set(list(df)) & set(list(types))):
        #print(var)
        if not isnan2(types[var]):
            if types[var]=='float':
                df[var]=pd.to_numeric(df[var],downcast=types[var],errors='coerce')
            if types[var]=='float_nd':
                df[var]=pd.to_numeric(df[var],errors='coerce')
            elif types[var]=='integer':
                df[var].replace(regex=True,inplace=True,to_replace=r'-',value=r'')
                df[var]=pd.to_numeric(df[var],downcast=types[var],errors='coerce')
            elif types[var]=='integer_nd':
                df[var].replace(regex=True,inplace=True,to_replace=r'-',value=r'')
                df[var]=pd.to_numeric(df[var],errors='coerce')
            elif types[var]=='cat':
                df[var]=df[var].astype('category')
            elif types[var]=='date':
                df[var]=pd.to_datetime(df[var], format='%Y-%m-%d',errors='coerce')
            elif types[var]=='date_comp':
                df[var]=pd.to_datetime(df[var])
            elif types[var]=='dateym':
                df[var]=pd.to_datetime(df[var], format='%Y-%m',errors='coerce')
            elif types[var]=='date_safe':
                #Create a dictionary of the date values and then use replace to adjust the column
                try:
                    temp={d:dt.datetime.fromtimestamp(d, dt.timezone.utc) for d in df[var].unique()}
                except:
                    pass
                try:
                    temp={d:dt.datetime.fromtimestamp(d/1000, tz=dt.timezone.utc) for d in df[var].unique()}
                except:
                    pass
                df[var]=df[var].replace(temp)
    return df



# =============================================================================
# # Code to create raw and cleaned samples representing cell level randomized samples of Safegraph data.
# =============================================================================

#Read in the normalization panel that will adjust for the changing Safegraph device population over time.
norm_raw = pd.read_csv(r'D:\jodo_emga\Plasma\Data\Safegraph\RawData\PatternsMonthly\normalization_stats\2022\04\05\20\normalization_stats.csv',encoding='latin1')
norm_raw_sample = resample(df=norm_raw,n=10000)
norm_raw_sample.to_parquet(cdd['p_d_sg_sample'] + r'/norm_raw_sample.parquet')
#clean the normalization dataset
norm_cleaned_sample = norm_raw_sample.copy().rename(columns={'region':'state'})
norm_cleaned_sample = norm_cleaned_sample.groupby(['year','month','state'])[['total_visits', 'total_devices_seen', 'total_home_visits', 'total_home_visitors']].mean().reset_index()
norm_cleaned_sample['state'] = norm_cleaned_sample['state'].str.upper()
zip_pop = pd.read_parquet(cdd['p_d_acs_zip']+r'\acs_2014_2022_s3.parquet')
pop_st = zip_pop.groupby('state')['population'].sum().reset_index()
state_codes = {'WA': '53', 'DE': '10', 'DC': '11', 'WI': '55', 'WV': '54', 'HI': '15',
                'FL': '12', 'WY': '56', 'PR': '72', 'NJ': '34', 'NM': '35', 'TX': '48',
                'LA': '22', 'NC': '37', 'ND': '38', 'NE': '31', 'TN': '47', 'NY': '36',
                'PA': '42', 'AK': '02', 'NV': '32', 'NH': '33', 'VA': '51', 'CO': '08',
                'CA': '06', 'AL': '01', 'AR': '05', 'VT': '50', 'IL': '17', 'GA': '13',
                'IN': '18', 'IA': '19', 'MA': '25', 'AZ': '04', 'ID': '16', 'CT': '09',
                'ME': '23', 'MD': '24', 'OK': '40', 'OH': '39', 'UT': '49', 'MO': '29',
                'MN': '27', 'MI': '26', 'RI': '44', 'KS': '20', 'MT': '30', 'MS': '28',
                'SC': '45', 'KY': '21', 'OR': '41', 'SD': '46', "AS": "60", "MP": "69",
                "VI": "78", "GU": "66"}
state_codes2 = {int(state_codes[s]):s for s in state_codes.keys()}
pop_st['state'] = pop_st['state'].map(state_codes2)
norm_cleaned_sample = pd.merge(norm_cleaned_sample,pop_st,how='inner',on='state')
norm_cleaned_sample['nfactor'] = (norm_cleaned_sample['population'] / norm_cleaned_sample['total_devices_seen']).clip(upper=50)
norm_cleaned_sample['date'] = pd.to_datetime(norm_cleaned_sample['year'].astype(str)+norm_cleaned_sample['month'].astype(str).apply(lambda x: x.zfill(2))+'01',format='%Y%m%d')
norm_cleaned_sample['date'] = norm_cleaned_sample['date'].apply(lambda x: (x+relativedelta(months=+1))+relativedelta(days=-1))
norm_cleaned_sample.to_parquet(cdd['p_d_sg_sample'] + r'/norm_cleaned_sample.parquet')
#We merge norm with pattern data and define the normalized pattern data (e.g., normalized visists called visitsn = visits * nfactor)

#Read in Safegraph core dataset
core_raw = pd.read_csv(r'D:\jodo_emga\Plasma\Data\Safegraph\RawData\Core\core_poi\2021\11\04\18\core_poi-part3.csv.gz',compression='gzip',sep=',',encoding='latin1')
core_raw_sample = resample(df=core_raw,n=100000)
core_raw_sample.to_parquet(cdd['p_d_sg_sample'] + r'/core_raw_sample.parquet')
#Clean core data
core_dtype = pd.read_excel(cdd['p_c'] + r'\3_Analysis\Safegraph_Variables.xlsx',sheet_name='Core')
core_ddict = dict(zip(core_dtype['variable'].to_list(),core_dtype['new_type'].to_list()))
core_rndict = dict(zip(core_dtype['variable'].to_list(),core_dtype['new_name'].to_list()))
core_cleaned_sample=downsize(df=core_raw_sample,types=core_ddict).rename(columns=core_rndict)
core_cleaned_sample.to_parquet(cdd['p_d_sg_sample'] + r'/core_cleaned_sample.parquet')
#We used text queries to mark plasma centers in safegraph data and also completed a manual review to verify matches.


#Read in Safegraph pattern dataset
pattern_raw = pd.read_csv(r'D:\jodo_emga\Plasma\Data\Safegraph\RawData\PatternsMonthly\patterns\2022\04\05\20\patterns-part1.csv.gz',compression='gzip',sep=',',encoding='latin1',low_memory=False)
pattern_raw_sample = resample(df=pattern_raw,n=100000)
pattern_raw_sample.to_parquet(cdd['p_d_sg_sample'] + r'/pattern_raw_sample.parquet')
pm_dtype = pd.read_excel(cdd['p_c']+r'\3_Analysis\Safegraph_Variables.xlsx',sheet_name='MonthlyPat')
#Clean pattern data
pm_rndict = dict(zip(pm_dtype['variable'].to_list(),pm_dtype['new_name'].to_list()))
pm_ddict = dict(zip(pm_dtype['new_name'].to_list(),pm_dtype['new_type'].to_list()))
pattern_cleaned_sample=downsize(pattern_raw_sample.copy(),pm_ddict).rename(columns=pm_rndict)
pattern_cleaned_sample['cbg'] = pd.to_numeric(pattern_cleaned_sample['cbg'].astype(str).apply(lambda x: x.split(':')[1] if any(re.findall(r':',x)) else x),errors='coerce')
pattern_cleaned_sample['date_e'] = pd.to_datetime(pattern_cleaned_sample['date_e'].str[:10],format='%Y-%m-%d') - dt.timedelta(days=1)
pattern_cleaned_sample.to_parquet(cdd['p_d_sg_sample'] + r'/pattern_cleaned_sample.parquet')






















